# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Light classes to handle parameters for classes construction.
.. currentmodule hysop.tools
* :class:`~MPIParams`
* :class:`~CartesianDiscretization`
"""
import hashlib
from collections import namedtuple
from hysop.constants import HYSOP_DEFAULT_TASK_ID, BoundaryCondition
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.hash import hash_communicator
from hysop.tools.numpywrappers import npw
from hysop.core.mpi import main_comm, main_rank, MPI
[docs]
class MPIParams(
namedtuple("MPIParams", ["comm", "size", "task_id", "rank", "on_task"])
):
"""
Struct to save mpi parameters :
- comm : parent mpi communicator (default = main_comm)
- task_id : id of the task that owns this object
(default = HYSOP_DEFAULT_TASK_ID)
- rank of the current process in comm
- on_task : true if the task_id of the object corresponds
to the task_id of the current process.
This struct is useful for operators : each operator has
a MPIParams attribute to save its mpi settings.
Examples
---------
op = SomeOperator(..., task_id=1)
if op.is_on_task():
...
'is_on_task' will return MPIParams.on_task value for op
and tell if the current operator belongs to the current process
mpi task.
"""
def __new__(
cls, comm=main_comm, task_id=HYSOP_DEFAULT_TASK_ID, rank=main_rank, on_task=True
):
if not on_task:
rank = MPI.PROC_NULL
comm = MPI.COMM_NULL
size = -1
elif comm != MPI.COMM_NULL:
rank = comm.Get_rank()
size = comm.Get_size()
else:
rank = MPI.UNDEFINED
size = -1
return super().__new__(cls, comm, size, task_id, rank, on_task)
[docs]
def diff(self, other):
d = {}
if self.task_id != other.task_id:
d["task_id"] = (self.task_id, other.task_id)
if self.on_task != other.on_task:
d["on_task"] = self.on_task != other.on_task
if not (self.comm is other.comm):
d["comm"] = (self.comm, other.comm)
return d
def __eq__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
eq = self.task_id == other.task_id
eq &= self.on_task == other.on_task
eq &= self.comm is other.comm
return eq
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
h = hashlib.sha1()
h.update(str(self.task_id).encode("utf-8"))
h.update(str(self.on_task).encode("utf-8"))
return hash(h.hexdigest()) ^ id(self.comm)
[docs]
class CartesianDiscretization(
namedtuple(
"CartesianDiscretization",
["resolution", "ghosts", "lboundaries", "rboundaries"],
)
):
"""
A struct to handle discretization parameters:
- a resolution (either a list of int or a numpy array of int)
resolution is GRID_RESOLUTION. GLOBAL_RESOLUTION is GRID_RESOLUTION + PERIODICITY.
- number of points in the ghost-layer. One value per direction, list
or array. Default = None (ie. no ghosts).
- global boundary conditions that should be prescribed on the left and the
right of the box shaped domain for each axis. Defaults to periodic
boundary conditions everywhere.
"""
def __new__(
cls,
resolution,
ghosts=None,
lboundaries=None,
rboundaries=None,
default_boundaries=False,
):
assert not ((lboundaries is None) ^ (rboundaries is None))
from hysop.tools.numpywrappers import npw
resolution = npw.asdimarray(resolution)
if ghosts is not None:
ghosts = npw.asintegerarray(ghosts)
msg = "Dimensions of resolution and ghosts parameters"
msg += " are not complient."
assert ghosts.size == resolution.size, msg
assert all(ghosts >= 0)
else:
ghosts = npw.integer_zeros(resolution.size)
assert not ((lboundaries is None) ^ (rboundaries is None))
if default_boundaries:
assert lboundaries is None
assert rboundaries is None
lboundaries = npw.empty(shape=(resolution.size,), dtype=object)
lboundaries[...] = BoundaryCondition.PERIODIC
rboundaries = lboundaries.copy()
check_instance(
lboundaries,
npw.ndarray,
dtype=object,
size=resolution.size,
values=BoundaryCondition,
allow_none=True,
)
check_instance(
rboundaries,
npw.ndarray,
dtype=object,
size=resolution.size,
values=BoundaryCondition,
allow_none=True,
)
npw.set_readonly(resolution, ghosts)
if lboundaries is not None:
npw.set_readonly(lboundaries, rboundaries)
return super().__new__(cls, resolution, ghosts, lboundaries, rboundaries)
@property
def boundaries(self):
"""Left and right boundary conditions as a tuple."""
if self.lboundaries is None:
raise AttributeError
else:
return (self.lboundaries, self.rboundaries)
@property
def periodicity(self):
if (self.lboundaries is None) or (self.rboundaries is None):
raise AttributeError
else:
return self.lboundaries == BoundaryCondition.PERIODIC
@property
def grid_resolution(self):
"""Effective grid resolution given by user."""
return self.resolution
@property
def global_resolution(self):
"""
Logical grid resolution (grid_resolution + periodicity).
Can only be fetched if boundaries have been specified.
"""
return self.grid_resolution + self.periodicity
def __eq__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
if (self.lboundaries is None) ^ (other.lboundaries is None):
return False
match = (self.resolution == other.resolution).all()
match &= (self.ghosts == other.ghosts).all()
match &= (self.lboundaries == other.lboundaries).all()
match &= (self.rboundaries == other.rboundaries).all()
return match
def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
return not result
def __str__(self):
s = "Cartesian discretization:"
s += f"\n *resolution: {self.resolution}"
s += f"\n *ghosts: {self.ghosts}"
if self.lboundaries:
s += f"\n *lboundaries: {self.lboundaries.tolist()}"
s += f"\n *rboundaries: {self.rboundaries.tolist()}"
else:
s += "\n *lboundaries: None"
s += "\n *rboundaries: None"
return s
def __hash__(self):
h = hashlib.sha1()
h.update(self.resolution.view(npw.uint8))
h.update(self.ghosts.view(npw.uint8))
if self.lboundaries is not None:
h.update(
str(hash(tuple(int(bd) for bd in self.lboundaries))).encode("utf-8")
)
if self.rboundaries is not None:
h.update(
str(hash(tuple(int(bd) for bd in self.rboundaries))).encode("utf-8")
)
return hash(h.hexdigest())